package com.think.cloud.thinkshop.mall.rabbit.consumer;


import cn.hutool.json.JSONUtil;
import com.think.cloud.thinkshop.common.enums.RabbitMessageTypeEnum;
import com.think.cloud.thinkshop.common.enums.operationplan.OperationPlanStatusEnum;
import com.think.cloud.thinkshop.mall.domain.memberuser.MemberOperationPlan;
import com.think.cloud.thinkshop.mall.service.coupon.IProductCouponService;
import com.think.cloud.thinkshop.mall.service.memberuser.IMemberGroupService;
import com.think.cloud.thinkshop.mall.service.memberuser.IMemberOperationPlanService;
import com.think.cloud.thinkshop.mall.service.order.AppOrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;

import static com.think.cloud.thinkshop.mall.rabbit.config.RabbitConfig.DELAY_QUEUE_NAME;
import static com.think.cloud.thinkshop.mall.rabbit.config.RabbitConfig.QUEUE_NAME;

@Component
@Slf4j
public class MessageConsumer {
    @Autowired
    private AppOrderService orderService;
    @Autowired
    private IMemberGroupService memberGroupService;
    @Autowired
    private IProductCouponService productCouponService;
    @Autowired
    private IMemberOperationPlanService memberOperationPlanService;

    @RabbitListener(queues = QUEUE_NAME, ackMode = "MANUAL")
    public void handleMessage(@Payload Message message, com.rabbitmq.client.Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            // 处理消息逻辑
            String messageType = message.getMessageProperties().getType();
            String msg = new String(message.getBody());
            log.info("=========================》消费消息: 类型：{},msg:{} =================", messageType, msg);
            switch (RabbitMessageTypeEnum.toEnum(messageType)) {
                case MEMBER_GROUP_CONFIG_INSERT_OR_UPDATE:
                    memberGroupService.memberGroupUserUpdate(msg);
                    break;
                case MEMBER_GROUP_UPDATE_BY_USER_ACTION:
                    memberGroupService.memberGroupUserUpdateByUserId(Long.valueOf(msg));
                    break;
                case OPERATION_PLAN_EXECUTE:
                    memberOperationPlanService.execute(JSONUtil.toBean(msg, MemberOperationPlan.class));
                    break;
                default:
                    log.error("消息类型错误：{}", JSONUtil.toJsonStr(message));
            }
            // 成功处理后确认消息
            channel.basicAck(tag, false);
        } catch (Exception e) {
            // 处理失败，可以选择重新入队或拒绝消息
//            channel.basicReject(tag, true); // 第二个参数requeue表示是否重新入队
        }
    }

    @RabbitListener(queues = DELAY_QUEUE_NAME)
    public void handleDelayMessage(Message message, com.rabbitmq.client.Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            // 处理消息逻辑
            String messageType = message.getMessageProperties().getType();
            String msg = new String(message.getBody());
            log.info("消费延迟消息: {},{}", messageType, msg);
            switch (RabbitMessageTypeEnum.toEnum(messageType)) {
                case DELAY_ORDER_CANCEL_HOUR:
                    orderService.cancel(Long.valueOf(msg));
                    break;
                case DELAY_ORDER_AUTOMATIC_RECEIPT_DAYS:
                    orderService.receipt(Long.valueOf(msg));
                    break;
                case DELAY_ORDER_AFTER_SALE_CLOSE:
                    orderService.sendIntegral(Long.valueOf(msg));
                    break;
                case DELAY_ORDER_AFTER_SALE_AUTOMATIC_CANCELLATION:
                    break;
                case DELAY_COUPON_START:
                case DELAY_COUPON_END:
                    productCouponService.autoSwitch(Long.valueOf(msg));
                    break;
                case DELAY_OPERATION_PLAN_EXECUTE_AUTO_START:
                    memberOperationPlanService.updateStatus(JSONUtil.toBean(msg, MemberOperationPlan.class), OperationPlanStatusEnum.IN_PROGRESS);
                    break;
                case DELAY_OPERATION_PLAN_EXECUTE_AUTO_END:
                    memberOperationPlanService.updateStatus(JSONUtil.toBean(msg, MemberOperationPlan.class), OperationPlanStatusEnum.END);
                    break;
                case DELAY_OPERATION_PLAN_EXECUTE_MANUAL_START:
                    memberOperationPlanService.doExecute(JSONUtil.toBean(msg, MemberOperationPlan.class));
                    break;
                case DELAY_ORDER_WAIT_PAY_NOTICE:
                    orderService.waitPayNotice(Long.valueOf(msg));
                    break;
                case DELAY_ORDER_AUTO_RECEIPT_NOTICE:
                    orderService.autoNoticeBeforeReceipt(Long.valueOf(msg));
                    break;
                default:
                    log.error("消息类型错误：{}", JSONUtil.toJsonStr(message));
            }
            // 成功处理后确认消息
            channel.basicAck(tag, false);
        } catch (Exception e) {
            // 处理失败，可以选择重新入队或拒绝消息
//            channel.basicReject(tag, true); // 第二个参数requeue表示是否重新入队
        }
    }

}
